iT邦幫忙

2024 iThome 鐵人賽

DAY 12
0
Software Development

用 NestJS 闖蕩微服務!系列 第 12

[用NestJS闖蕩微服務!] DA12 - Custom Transporter (下)

  • 分享至 

  • xImage
  •  

什麼是 NSQ?

NSQ 是一個高效、可靠的即時分散式訊息處理系統,以去中心化的設計提供了高可用性和水平擴展能力。它適合用於需要低延遲和高吞吐量的訊息傳遞,如:微服務架構。

NSQ Logo

圖片來源

NSQ 採用 Publish/Subscribe 機制,Consumer 會訂閱某個特定的 主題(Topic),當 Producer 發送該 Topic 的 訊息(Message) 時,NSQ 會將訊息傳送給 Consumer。

NSQ Publish/Subscribe Concept

NSQ 的組成

NSQ 由以下三個元件組成:

  • nsqd:用來接收、佇列、傳送 Message 的元件,每個 nsqd 實例都是一個獨立的服務。
  • nsqlookupd:提供 NSQ 的服務發現機制,且會當作 nsqd 與客戶端之間的橋樑,讓 Consumer 可以順利取得來自 Producer 的 Message。
  • nsqadmin:提供對 NSQ Cluster 的監控和管理功能,以 Web 的方式呈現。

Topic

Topic 是用來分類 Message 的, Producer 可以根據 Message 的類型發送至不同的 Topic,Consumer 也可以根據所需來訂閱某些 Topic 的 Message。每一個 Topic 就是一個 流(Stream),它會將 Producer 發送的 Message 進行緩衝,防止大量 Message 阻塞的發生。

從上面的敘述可以得知,Topic 是 Consumer 與 Producer 溝通的介面,格式為字串,限制使用字元 azAZ09-_,長度的部份至少要有 1 個字元,但不得超過 64 字元。下方是幾個 NSQ Topic 的範例:

  • order_created
  • order-update

Channel

在每個 Topic 下,會有一個或多個 通道(Channel), 而一個 Channel 會有一個或多個 Consumer,每當 Producer 發送了一個 Message 時,在該 Topic 下的 所有 Channel 都會獲得該 Message 的副本,Channel 會將該 Message 分配給 其中一個 Consumer,達到負載平衡的效果。

NSQ Channel Concept

注意:如果一個 Topic 底下沒有 Channel,那麼 Message 會 Queue 在 Topic 內,直到 Channel 建立。

安裝 NSQ

在終端機輸入下方指令以便從 Docker Hub 下載 NSQ 的 Docker Image:

$ docker pull nsqio/nsq

我們會用 Docker 來架設三個元件,為了讓它們之間可以進行通訊,故需要建立一個 Docker Bridge Network 來建立通訊的橋樑:

$ docker network create <NETWORK_NAME>

建立好之後,透過下方指令先將 nsqlookupd 架設起來:

$ docker run --name <NSQ_LOOKUPD_NAME> --network <NETWORK_NAME> -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

架設好 nsqlookupd 之後,透過下方指令將 nsqd 架設起來,並連接到 nsqlookupd:

$ docker run --name <NSQD_NAME> --network <NETWORK_NAME> -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=<NSQD_NAME> --lookupd-tcp-address=<NSQ_LOOKUPD_NAME>:4160

最後,透過下方指令將 nsqadmin 架設起來,並連接到 nsqlookupd:

$ docker run --name <NSQ_ADMIN_NAME> --network <NETWORK_NAME> -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=<NSQ_LOOKUPD_NAME>:4161

操作 NSQ

透過 nsqd 提供的 HTTP API 即可對它進行操作,透過 Postman 進行測試,以 POST 方法存取 http://localhost:4151/pub?topic=test,並傳送 Hello NSQ! 作為請求主體,會看到下方回應顯示 OK

Test Publish NSQ Message Result

透過瀏覽器打開 http://localhost:4171 進入 nsqadmin 的 Web 介面並點選上方「Streams」,會看到「Topic Message Queue」有一個「NSQd Host」且它的「Messages」為 1

nsqadmin Topic Message Queue Result

Custom NSQ Transporter

接下來,我們要運用前一篇的內容來實作 NSQ Transporter,讓我們可以用跟其他 Transporter 一樣的開發風格來使用 NSQ。

前置作業

首先,安裝 NSQ 官方推出的 Node.js Client - nsqjs,我們會基於它來進行實作:

$ npm install nsqjs

另外,還需要安裝它的型別定義檔:

$ npm install @types/nsqjs -D

注意:本篇文章僅會使用部分功能,如果有興趣的話可以參考官方文件

實作 Strategy

注意:本次僅會實作 Event-based 的情境,且實作內容不建議使用於生產環境,僅作為示範使用。

建立一個 NsqServer 繼承 Server 並實作 CustomTransportStrategy

import { Server, CustomTransportStrategy } from '@nestjs/microservices';

export class NsqServer extends Server implements CustomTransportStrategy {
  listen(callback: (...args: unknown[]) => any) {
    callback();
  }

  close() {}
}

接著,定義 NsqServeroptions 參數,名為 NsqOption,它提供建立 Consumer 可以帶入的 ReaderConnectionConfigOptions 選項以及收到訊息後用來反序列化的 deserializer

import { ConsumerDeserializer } from '@nestjs/microservices';
import { ReaderConnectionConfigOptions } from 'nsqjs';

export interface NsqOption {
  deserializer?: ConsumerDeserializer;
  consumer?: ReaderConnectionConfigOptions;
}

我們要讓 NsqServer 在啟動時抓取所有套用 @EventPattern 的 Handler,並根據帶入的 Pattern 來建立對應的 Consumer 實例,但因為建立 Consumer 除了需要 Topic 外,還需要指定 Channel,所以需要定義 @EventPattern 的參數格式:

export interface NsqPattern {
  topic: string;
  channel: string;
}

在使用上就會像下方 AppController 一樣,帶入符合 NsqPattern 的物件:

import { Controller } from '@nestjs/common';
import { EventPattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern({
    topic: 'order_created',
    channel: 'test',
  })
  orderCreated(data: { msg: string; }) {
    console.log('orderCreated', data);
  }
}

有了取得 Topic 與 Channel 的方式之後,還需要定義 Context 讓處理訊息的 handleEvent 可以接收,這裡我們定義 NsqContext 並提供 getTopicgetChannel 方法:

import { BaseRpcContext } from '@nestjs/microservices';

export class NsqContext extends BaseRpcContext {
  getTopic(): string {
    return this.getArgByIndex(0);
  }

  getChannel(): string {
    return this.getArgByIndex(1);
  }
}

由於 handleEvent 只接收 ReadPacket 格式的訊息,如果使用 Consumer 本身回應的 Message 就不會吻合,所以需要透過 deserializer 來進行反序列化,但原生預設的 deserializerIncomingRequestDeserializer,不太符合我們目前的使用情境,所以需要自行設計一個 deserializer 作為預設值。新增一個 NsqInboundMessageDeserializer 來將 Consumer 回應的 Message 轉換成 IncomingEvent 以符合 ReadPacket 格式:

import { ConsumerDeserializer, IncomingEvent } from '@nestjs/microservices';
import { Message } from 'nsqjs';
import { NsqOption } from './nsq';

export class NsqInboundMessageDeserializer implements ConsumerDeserializer {
  deserialize(message: Message, options?: NsqOption): IncomingEvent {
    return {
      data: message.json(),
      pattern: options,
    };
  }
}

最後,就可以跟 nsqjs 做整合,調整 NsqServer 的內容,定義 generateConsumer 方法來產生 Consumer 實例,再定義 registerConsumer 來讓 Consumer 持續收 Message,一旦收到 Message 就會透過 deserializer 反序列化成內部使用的訊息格式,再透過 handleEvent 將訊息交給 Server 來處理,如此一來,就可以讓符合 Pattern 的 Handler 收到該訊息,最後就是定義 bindEvents 來抓取所有套用 @EventPattern 的 Handler 並產生 Consumer:

import { Server, CustomTransportStrategy, IncomingEvent } from '@nestjs/microservices';
import { Reader } from 'nsqjs';
import { NsqOption, NsqPattern, NsqContext } from './nsq';
import { NsqInboundMessageDeserializer } from './nsq-deserializer';

export class NsqServer extends Server implements CustomTransportStrategy {
  private readonly consumers: Reader[] = [];

  constructor(private readonly options?: NsqOption) {
    super();
    // 使用 `NsqInboundMessageDeserializer` 當作預設值
    this.initializeDeserializer({
      ...this.options,
      deserializer: this.options?.deserializer ?? new NsqInboundMessageDeserializer(),
    });
  }

  listen(callback: (...args: unknown[]) => any) {
    this.bindEvents();
    callback();
  }

  close() {
    // 關閉所有 Consumer
    this.consumers.forEach((consumer) => {
      consumer.close();
    });
  }

  private bindEvents() {
    this.messageHandlers.forEach((handler, pattern) => {
      if (!handler.isEventHandler) {
        return;
      }
      const nsqPattern: NsqPattern = JSON.parse(pattern);
      const consumer = this.generateConsumer(nsqPattern);
      this.registerConsumer(consumer, nsqPattern);
      this.consumers.push(consumer);
    });
  }

  private generateConsumer(params: NsqPattern) {
    const { topic, channel } = params;
    return new Reader(topic, channel, this.options?.consumer);
  }

  private registerConsumer(consumer: Reader, pattern: NsqPattern) {
    consumer.connect();

    consumer.on('message', async (message) => {
      const result = this.deserializer.deserialize(message, pattern) as IncomingEvent;
      const context = new NsqContext([pattern.topic, pattern.channel]);
      await this.handleEvent(JSON.stringify(pattern), result, context);
      message.finish();
    });
  }
}

到目前為止已經完成 Strategy 的部份,可以實際套用來測試看看,在 main.ts 使用該 Strategy,並設定 consumernsqdTCPAddresseslocalhost:4150

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { NsqServer } from './transporters';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    strategy: new NsqServer({
      consumer: {
        nsqdTCPAddresses: ['localhost:4150'],
      },
    }),
  });
  await app.listen();
}
bootstrap();

透過 Postman 進行測試,使用 POST 方法存取 http://localhost:4151/pub?topic=order_created,並指定主體資料為 { "msg": "created" }

NSQ Transporter Strategy Result1

此時會在微服務應用程式的終端機看到收到的訊息內容:

NSQ Transporter Strategy Result2

實作 ClientProxy

建立 NsqClient 並繼承 ClientProxy,根據上一篇內容所述,會需要實作 connectclosepublish 以及 dispatchEvent

import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Writer } from 'nsqjs';

export class NsqClient extends ClientProxy {
  connect(): Promise<Writer> {}

  close() {}

  protected publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void {
    throw new Error('no implementation');
  }

  protected dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T> {}
}

注意:由於我們的範例並沒有實作 Request-response 模式,故 publish 是拋出 Error

如果希望 Producer 可以自行設定連線資訊,以及希望可以設置 serializer,那麼可以設置 NsqOption 當作 NsqClientoptions 參數內容:

import { ProducerSerializer } from '@nestjs/microservices';
import { ConnectionConfigOptions } from 'nsqjs';

export interface NsqOption {
  serializer?: ProducerSerializer;
  producer?: {
    connection?: ConnectionConfigOptions;
    nsqdHost?: string;
    nsqdPort?: number;
  };
}

接著,調整 NsqClient 的內容,在建構階段呼叫 initializeSerializer 來初始化 serializer。由於 ClientProxy 會在每次呼叫 emitsend 時都去執行 connect 來進行連線,所以我們需要在 connect 方法做一些處理,確保 Producer 可以被重用,而不是每次都重新建立連線。當 Producer 實例不存在時,會透過 generateProducer 來產生 Producer 實例並保存起來,再透過 connectToNsq 方法來建立連線;反之,如果存在 Producer 實例,就會直接回傳該實例。當開發者呼叫 NsqClientemit 方法時,會經由 dispatchEvent 來發送訊息,所以我們會在這個方法內透過 normalizePattern 來轉換 Pattern,並透過 serializer 序列化訊息,最後再透過 Producer 的 publish 將訊息發送出去:

import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Writer } from 'nsqjs';
import { NsqOption } from './nsq';

export class NsqClient extends ClientProxy {
  private producer: Writer | null = null;

  constructor(private readonly options?: NsqOption) {
    super();
    this.initializeSerializer({
      ...this.options,
    });
  }

  async connect(): Promise<Writer> {
    if (this.producer) {
      return this.producer;
    }
    this.producer = this.generateProducer();
    return this.connectToNsq(this.producer);
  }

  close() {
    // 關閉 Producer
    this.producer?.close();
  }

  protected publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void {
    throw new Error('no implementation');
  }

  protected dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T> {
    const pattern = this.normalizePattern(packet.pattern);
    const serializedMessage = this.serializer.serialize(packet);
    this.producer?.publish(pattern, serializedMessage.data);
    return Promise.resolve(serializedMessage.data);
  }

  private generateProducer() {
    const host = this.options?.producer?.nsqdHost ?? 'localhost';
    const port = this.options?.producer?.nsqdPort ?? 4150;
    return new Writer(host, port, this.options?.producer?.connection);
  }

  private connectToNsq(producer: Writer) {
    return new Promise<Writer>((resolve, reject) => {
      producer.connect();
      producer.once('ready', () => {
        resolve(producer);
      });
      producer.once('error', (err) => {
        reject(err);
      });
    });
  }
}

已經完成了 NsqClient,現在可以實際套用來測試看看,在 AppModule 透過 ClientsModuleregister 來註冊 NsqClient,這邊我們使用 customClass 來指定要以 NsqClient 來建立實例:

import { Module } from '@nestjs/common';
import { ClientsModule } from '@nestjs/microservices';
import { AppController } from './app.controller';
import { NsqClient } from './clients';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'NSQ_CLIENT',
        customClass: NsqClient,
      },
    ]),
  ],
  controllers: [AppController],
})
export class AppModule {}

接著,調整 AppController 的部份,使用 @Inject 裝飾器注入 NSQ_CLIENT,並建立 createOrder 方法來呼叫 NsqClientemit 方法,Pattern 的部份指定為 order_created,訊息的部份則帶入 { "msg": "Order Created!" }

import { Controller, Post, Inject } from '@nestjs/common';
import { NsqClient } from './clients';

@Controller()
export class AppController {
  constructor(@Inject('NSQ_CLIENT') private readonly nsqClient: NsqClient) {}

  @Post('orders')
  createOrder() {
    const data = { msg: 'Order created!' };
    this.nsqClient.emit('order_created', data);
    return data;
  }
}

透過 Postman 進行測試,以 POST 方法存取 http://localhost:3000/orders,此時會在微服務應用程式的終端機看到由 Client 端傳送的訊息:

NSQ Client Proxy Result

小結

回顧一下今天的重點內容,我們首先簡單介紹了 NSQ 這套高效的訊息處理系統。在了解了相關知識後,我們使用 Custom Transporter 的技巧,設計了一套簡單的 NSQ Transporter。在這個過程中,我們利用 Server 提供的方法來幫助我們更容易地構建策略,也善用了 ClientProxy 的方法來包裝 NsqClient

NestJS 的 Transporter 教學到此告一段落,這幾天我們學習了 NestJS 各式內建的 Transporter,如 Redis、NATS、gRPC 等,甚至還實作了屬於我們自己的 Transporter。這些內容展現了 NestJS 在提升開發體驗方面的努力。下一篇文章,我們將進入 微服務的健康與可觀測性篇,敬請期待!


上一篇
[用NestJS闖蕩微服務!] DAY11 - Custom Transporter (上)
下一篇
[用NestJS闖蕩微服務!] DAY13 - Health Check
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言